package com.nulldev.util.internal.backport.httpclient_rw;

import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
import java.net.ConnectException;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;

import com.nulldev.util.JVM.reflections.Reflections;
import com.nulldev.util.internal.backport.concurrency9.Objects;
import com.nulldev.util.internal.backport.concurrency9.concurrent.CompletableFuture;
import com.nulldev.util.internal.backport.httpclient_rw.HttpResponse.BodyHandler;
import com.nulldev.util.internal.backport.httpclient_rw.HttpResponse.PushPromiseHandler;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.BufferSupplier;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Log;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Logger;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.OperationTrackers.Trackable;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.OperationTrackers.Tracker;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Pair;
import com.nulldev.util.internal.backport.httpclient_rw.impl.common.Utils;
import com.nulldev.util.internal.backport.httpclient_rw.impl.websocket.BuilderImpl;
import com.nulldev.util.internal.backport.optionals.Optional;
import com.nulldev.util.networking.misc.TLSUtil;

/**
 * Client implementation. Contains all configuration information and also the
 * selector manager thread which allows async events to be registered and
 * delivered when they occur. See AsyncEvent.
 */
final class HttpClientImpl extends HttpClient implements Trackable {

	static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag
	static final boolean DEBUGTIMEOUT = false; // dev flag
	final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
	final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
	final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
	static final AtomicLong CLIENT_IDS = new AtomicLong();

	// Define the default factory as a static inner class
	// that embeds all the necessary logic to avoid
	// the risk of using a lambda that might keep a reference on the
	// HttpClient instance from which it was created (helps with
	// heapdump analysis).
	private static final class DefaultThreadFactory implements ThreadFactory {
		private final String namePrefix;
		private final AtomicInteger nextId = new AtomicInteger();

		DefaultThreadFactory(long clientID) {
			namePrefix = "HttpClient-" + clientID + "-Worker-";
		}

		@Override
		public Thread newThread(Runnable r) {
			String name = namePrefix + nextId.getAndIncrement();
			Thread t;
			if (System.getSecurityManager() == null) {
				t = new Thread(null, r, name, 0);
			} else {
				t = (Thread) new Reflections("sun.misc.InnocuousThread", false).invoke("newSystemThread", name, r);
			}
			t.setDaemon(true);
			return t;
		}
	}

	/**
	 * A DelegatingExecutor is an executor that delegates tasks to a wrapped
	 * executor when it detects that the current thread is the SelectorManager
	 * thread. If the current thread is not the selector manager thread the given
	 * task is executed inline.
	 */
	final static class DelegatingExecutor implements Executor {
		private final BooleanSupplier isInSelectorThread;
		private final Executor delegate;

		DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
			this.isInSelectorThread = isInSelectorThread;
			this.delegate = delegate;
		}

		Executor delegate() {
			return delegate;
		}

		@Override
		public void execute(Runnable command) {
			if (isInSelectorThread.getAsBoolean()) {
				delegate.execute(command);
			} else {
				command.run();
			}
		}
	}

	private final CookieHandler cookieHandler;
	private final Duration connectTimeout;
	private final Redirect followRedirects;
	private final Optional<ProxySelector> userProxySelector;
	private final ProxySelector proxySelector;
	private final Authenticator authenticator;
	private final Version version;
	private final ConnectionPool connections;
	private final DelegatingExecutor delegatingExecutor;
	private final boolean isDefaultExecutor;
	// Security parameters
	private final SSLContext sslContext;
	private final SSLParameters sslParams;
	private final SelectorManager selmgr;
	private final FilterFactory filters;
	private final Http2ClientImpl client2;
	private final long id;
	private final String dbgTag;

	// The SSL DirectBuffer Supplier provides the ability to recycle
	// buffers used between the socket reader and the SSLEngine, or
	// more precisely between the SocketTube publisher and the
	// SSLFlowDelegate reader.
	private final SSLDirectBufferSupplier sslBufferSupplier = new SSLDirectBufferSupplier(this);

	// This reference is used to keep track of the facade HttpClient
	// that was returned to the application code.
	// It makes it possible to know when the application no longer
	// holds any reference to the HttpClient.
	// Unfortunately, this information is not enough to know when
	// to exit the SelectorManager thread. Because of the asynchronous
	// nature of the API, we also need to wait until all pending operations
	// have completed.
	private final WeakReference<HttpClientFacade> facadeRef;

	// This counter keeps track of the number of operations pending
	// on the HttpClient. The SelectorManager thread will wait
	// until there are no longer any pending operations and the
	// facadeRef is cleared before exiting.
	//
	// The pendingOperationCount is incremented every time a send/sendAsync
	// operation is invoked on the HttpClient, and is decremented when
	// the HttpResponse<T> object is returned to the user.
	// However, at this point, the body may not have been fully read yet.
	// This is the case when the response T is implemented as a streaming
	// subscriber (such as an InputStream).
	//
	// To take care of this issue the pendingOperationCount will additionally
	// be incremented/decremented in the following cases:
	//
	// 1. For HTTP/2 it is incremented when a stream is added to the
	// Http2Connection streams map, and decreased when the stream is removed
	// from the map. This should also take care of push promises.
	// 2. For WebSocket the count is increased when creating a
	// DetachedConnectionChannel for the socket, and decreased
	// when the the channel is closed.
	// In addition, the HttpClient facade is passed to the WebSocket builder,
	// (instead of the client implementation delegate).
	// 3. For HTTP/1.1 the count is incremented before starting to parse the body
	// response, and decremented when the parser has reached the end of the
	// response body flow.
	//
	// This should ensure that the selector manager thread remains alive until
	// the response has been fully received or the web socket is closed.
	private final AtomicLong pendingOperationCount = new AtomicLong();
	private final AtomicLong pendingWebSocketCount = new AtomicLong();
	private final AtomicLong pendingHttpRequestCount = new AtomicLong();
	private final AtomicLong pendingHttp2StreamCount = new AtomicLong();

	/** A Set of, deadline first, ordered timeout events. */
	private final TreeSet<TimeoutEvent> timeouts;

	/**
	 * This is a bit tricky: 1. an HttpClientFacade has a final HttpClientImpl
	 * field. 2. an HttpClientImpl has a final WeakReference<HttpClientFacade>
	 * field, where the referent is the facade created for that instance. 3. We
	 * cannot just create the HttpClientFacade in the HttpClientImpl constructor,
	 * because it would be only weakly referenced and could be GC'ed before we can
	 * return it. The solution is to use an instance of SingleFacadeFactory which
	 * will allow the caller of new HttpClientImpl(...) to retrieve the facade after
	 * the HttpClientImpl has been created.
	 */
	private static final class SingleFacadeFactory {
		HttpClientFacade facade;

		HttpClientFacade createFacade(HttpClientImpl impl) {
			assert facade == null;
			return (facade = new HttpClientFacade(impl));
		}
	}

	static HttpClientFacade create(HttpClientBuilderImpl builder) {
		SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
		HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
		impl.start();
		assert facadeFactory.facade != null;
		assert impl.facadeRef.get() == facadeFactory.facade;
		return facadeFactory.facade;
	}

	private HttpClientImpl(HttpClientBuilderImpl builder, SingleFacadeFactory facadeFactory) {
		id = CLIENT_IDS.incrementAndGet();
		dbgTag = "HttpClientImpl(" + id + ")";
		if (builder.sslContext == null) {
			try {
				sslContext = SSLContext.getDefault();
			} catch (NoSuchAlgorithmException ex) {
				throw new InternalError(ex);
			}
		} else {
			sslContext = builder.sslContext;
		}
		Executor ex = builder.executor;
		if (ex == null) {
			ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
			isDefaultExecutor = true;
		} else {
			isDefaultExecutor = false;
		}
		delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
		facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
		client2 = new Http2ClientImpl(this);
		cookieHandler = builder.cookieHandler;
		connectTimeout = builder.connectTimeout;
		followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects;
		this.userProxySelector = Optional.ofNullable(builder.proxy);
		this.proxySelector = userProxySelector.orElseGet(HttpClientImpl::getDefaultProxySelector);
		if (debug.on())
			debug.log("proxySelector is %s (user-supplied=%s)", this.proxySelector, userProxySelector.isPresent());
		authenticator = builder.authenticator;
		if (builder.version == null) {
			version = HttpClient.Version.HTTP_2;
		} else {
			version = builder.version;
		}
		if (builder.sslParams == null) {
			sslParams = getDefaultParams(sslContext);
		} else {
			sslParams = builder.sslParams;
		}
		connections = new ConnectionPool(id);
		connections.start();
		timeouts = new TreeSet<>();
		try {
			selmgr = new SelectorManager(this);
		} catch (IOException e) {
			// unlikely
			throw new InternalError(e);
		}
		selmgr.setDaemon(true);
		filters = new FilterFactory();
		initFilters();
		assert facadeRef.get() != null;
	}

	private void start() {
		selmgr.start();
	}

	// Called from the SelectorManager thread, just before exiting.
	// Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
	// that may be still lingering there are properly closed (and their
	// possibly still opened SocketChannel released).
	private void stop() {
		// Clears HTTP/1.1 cache and close its connections
		connections.stop();
		// Clears HTTP/2 cache and close its connections.
		client2.stop();
	}

	private static SSLParameters getDefaultParams(SSLContext ctx) {
		SSLParameters params = ctx.getSupportedSSLParameters();
		String[] protocols = params.getProtocols();
		boolean found13 = false;
		for (String proto : protocols) {
			if (proto.equals("TLSv1.3")) {
				found13 = true;
				break;
			}
		}
		if (found13 && TLSUtil.allowTLS13())
			params.setProtocols(new String[]
				{ "TLSv1.3", "TLSv1.2" });
		else
			params.setProtocols(new String[]
				{ "TLSv1.2" });
		return params;
	}

	private static ProxySelector getDefaultProxySelector() {
		PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
		return AccessController.doPrivileged(action);
	}

	// Returns the facade that was returned to the application code.
	// May be null if that facade is no longer referenced.
	final HttpClientFacade facade() {
		return facadeRef.get();
	}

	// Increments the pendingOperationCount.
	final long reference() {
		pendingHttpRequestCount.incrementAndGet();
		return pendingOperationCount.incrementAndGet();
	}

	// Decrements the pendingOperationCount.
	final long unreference() {
		final long count = pendingOperationCount.decrementAndGet();
		final long httpCount = pendingHttpRequestCount.decrementAndGet();
		final long http2Count = pendingHttp2StreamCount.get();
		final long webSocketCount = pendingWebSocketCount.get();
		if (count == 0 && facade() == null) {
			selmgr.wakeupSelector();
		}
		assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
		assert http2Count >= 0 : "count of HTTP/2 operations < 0";
		assert webSocketCount >= 0 : "count of WS operations < 0";
		assert count >= 0 : "count of pending operations < 0";
		return count;
	}

	// Increments the pendingOperationCount.
	final long streamReference() {
		pendingHttp2StreamCount.incrementAndGet();
		return pendingOperationCount.incrementAndGet();
	}

	// Decrements the pendingOperationCount.
	final long streamUnreference() {
		final long count = pendingOperationCount.decrementAndGet();
		final long http2Count = pendingHttp2StreamCount.decrementAndGet();
		final long httpCount = pendingHttpRequestCount.get();
		final long webSocketCount = pendingWebSocketCount.get();
		if (count == 0 && facade() == null) {
			selmgr.wakeupSelector();
		}
		assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
		assert http2Count >= 0 : "count of HTTP/2 operations < 0";
		assert webSocketCount >= 0 : "count of WS operations < 0";
		assert count >= 0 : "count of pending operations < 0";
		return count;
	}

	// Increments the pendingOperationCount.
	final long webSocketOpen() {
		pendingWebSocketCount.incrementAndGet();
		return pendingOperationCount.incrementAndGet();
	}

	// Decrements the pendingOperationCount.
	final long webSocketClose() {
		final long count = pendingOperationCount.decrementAndGet();
		final long webSocketCount = pendingWebSocketCount.decrementAndGet();
		final long httpCount = pendingHttpRequestCount.get();
		final long http2Count = pendingHttp2StreamCount.get();
		if (count == 0 && facade() == null) {
			selmgr.wakeupSelector();
		}
		assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
		assert http2Count >= 0 : "count of HTTP/2 operations < 0";
		assert webSocketCount >= 0 : "count of WS operations < 0";
		assert count >= 0 : "count of pending operations < 0";
		return count;
	}

	// Returns the pendingOperationCount.
	final long referenceCount() {
		return pendingOperationCount.get();
	}

	final static class HttpClientTracker implements Tracker {
		final AtomicLong httpCount;
		final AtomicLong http2Count;
		final AtomicLong websocketCount;
		final AtomicLong operationsCount;
		final Reference<?> reference;
		final String name;

		HttpClientTracker(AtomicLong http, AtomicLong http2, AtomicLong ws, AtomicLong ops, Reference<?> ref, String name) {
			this.httpCount = http;
			this.http2Count = http2;
			this.websocketCount = ws;
			this.operationsCount = ops;
			this.reference = ref;
			this.name = name;
		}

		@Override
		public long getOutstandingOperations() {
			return operationsCount.get();
		}

		@Override
		public long getOutstandingHttpOperations() {
			return httpCount.get();
		}

		@Override
		public long getOutstandingHttp2Streams() {
			return http2Count.get();
		}

		@Override
		public long getOutstandingWebSocketOperations() {
			return websocketCount.get();
		}

		@Override
		public boolean isFacadeReferenced() {
			return reference.get() != null;
		}

		@Override
		public String getName() {
			return name;
		}
	}

	public Tracker getOperationsTracker() {
		return new HttpClientTracker(pendingHttpRequestCount, pendingHttp2StreamCount, pendingWebSocketCount, pendingOperationCount, facadeRef, dbgTag);
	}

	// Called by the SelectorManager thread to figure out whether it's time
	// to terminate.
	final boolean isReferenced() {
		HttpClient facade = facade();
		return facade != null || referenceCount() > 0;
	}

	/**
	 * Wait for activity on given exchange. The following occurs in the
	 * SelectorManager thread.
	 *
	 * 1) add to selector 2) If selector fires for this exchange then call
	 * AsyncEvent.handle()
	 *
	 * If exchange needs to change interest ops, then call registerEvent() again.
	 */
	void registerEvent(AsyncEvent exchange) throws IOException {
		selmgr.register(exchange);
	}

	/**
	 * Allows an AsyncEvent to modify its interestOps.
	 * 
	 * @param event The modified event.
	 */
	void eventUpdated(AsyncEvent event) throws ClosedChannelException {
		assert !(event instanceof AsyncTriggerEvent);
		selmgr.eventUpdated(event);
	}

	boolean isSelectorThread() {
		return Thread.currentThread() == selmgr;
	}

	Http2ClientImpl client2() {
		return client2;
	}

	private void debugCompleted(String tag, long startNanos, HttpRequest req) {
		if (debugelapsed.on()) {
			debugelapsed.log(tag + " elapsed " + (System.nanoTime() - startNanos) / 1000_000L + " millis for " + req.method() + " to " + req.uri());
		}
	}

	@Override
	public <T> HttpResponse<T> send(HttpRequest req, BodyHandler<T> responseHandler) throws IOException, InterruptedException {
		CompletableFuture<HttpResponse<T>> cf = null;
		try {
			cf = sendAsync(req, responseHandler, null, null);
			return cf.get();
		} catch (InterruptedException ie) {
			if (cf != null)
				cf.cancel(true);
			throw ie;
		} catch (ExecutionException e) {
			final Throwable throwable = e.getCause();
			final String msg = throwable.getMessage();

			if (throwable instanceof IllegalArgumentException) {
				throw new IllegalArgumentException(msg, throwable);
			} else if (throwable instanceof SecurityException) {
				throw new SecurityException(msg, throwable);
			} else if (throwable instanceof HttpConnectTimeoutException) {
				HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
				hcte.initCause(throwable);
				throw hcte;
			} else if (throwable instanceof HttpTimeoutException) {
				throw new HttpTimeoutException(msg);
			} else if (throwable instanceof ConnectException) {
				ConnectException ce = new ConnectException(msg);
				ce.initCause(throwable);
				throw ce;
			} else if (throwable instanceof IOException) {
				throw new IOException(msg, throwable);
			} else {
				throw new IOException(msg, throwable);
			}
		}
	}

	private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();

	@Override
	public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler) {
		return sendAsync(userRequest, responseHandler, null);
	}

	@Override
	public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler, PushPromiseHandler<T> pushPromiseHandler) {
		return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
	}

	private <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler, PushPromiseHandler<T> pushPromiseHandler,
			Executor exchangeExecutor) {

		Objects.requireNonNull(userRequest);
		Objects.requireNonNull(responseHandler);

		AccessControlContext acc = null;
		if (System.getSecurityManager() != null)
			acc = AccessController.getContext();

		// Clone the, possibly untrusted, HttpRequest
		HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
		if (requestImpl.method().equals("CONNECT"))
			throw new IllegalArgumentException("Unsupported method CONNECT");

		long start = DEBUGELAPSED ? System.nanoTime() : 0;
		reference();
		try {
			if (debugelapsed.on())
				debugelapsed.log("ClientImpl (async) send %s", userRequest);

			// When using sendAsync(...) we explicitly pass the
			// executor's delegate as exchange executor to force
			// asynchronous scheduling of the exchange.
			// When using send(...) we don't specify any executor
			// and default to using the client's delegating executor
			// which only spawns asynchronous tasks if it detects
			// that the current thread is the selector manager
			// thread. This will cause everything to execute inline
			// until we need to schedule some event with the selector.
			Executor executor = exchangeExecutor == null ? this.delegatingExecutor : exchangeExecutor;

			MultiExchange<T> mex = new MultiExchange<>(userRequest, requestImpl, this, responseHandler, pushPromiseHandler, acc);
			CompletableFuture<HttpResponse<T>> res = mex.responseAsync(executor).whenComplete((b, t) -> unreference());
			if (DEBUGELAPSED) {
				res = res.whenComplete((b, t) -> debugCompleted("ClientImpl (async)", start, userRequest));
			}

			// makes sure that any dependent actions happen in the CF default
			// executor. This is only needed for sendAsync(...), when
			// exchangeExecutor is non-null.
			if (exchangeExecutor != null) {
				res = res.whenCompleteAsync((r, t) -> {
					/* do nothing */}, ASYNC_POOL);
			}
			return res;
		} catch (Throwable t) {
			unreference();
			debugCompleted("ClientImpl (async)", start, userRequest);
			throw t;
		}
	}

	// Main loop for this client's selector
	private final static class SelectorManager extends Thread {

		// For testing purposes we have an internal System property that
		// can control the frequency at which the selector manager will wake
		// up when there are no pending operations.
		// Increasing the frequency (shorter delays) might allow the selector
		// to observe that the facade is no longer referenced and might allow
		// the selector thread to terminate more timely - for when nothing is
		// ongoing it will only check for that condition every NODEADLINE ms.
		// To avoid misuse of the property, the delay that can be specified
		// is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
		// value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
		// The property is -Djdk.internal.httpclient.selectorTimeout=<millis>
		private static final int MIN_NODEADLINE = 1000; // ms
		private static final int MAX_NODEADLINE = 1000 * 1200; // ms
		private static final int DEF_NODEADLINE = 3000; // ms
		private static final long NODEADLINE; // default is DEF_NODEADLINE ms
		static {
			// ensure NODEADLINE is initialized with some valid value.
			long deadline = Utils.getIntegerProperty("jdk.internal.httpclient.selectorTimeout", DEF_NODEADLINE); // millis
			if (deadline <= 0)
				deadline = DEF_NODEADLINE;
			deadline = Math.max(deadline, MIN_NODEADLINE);
			NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
		}

		private final Selector selector;
		private volatile boolean closed;
		private final List<AsyncEvent> registrations;
		private final List<AsyncTriggerEvent> deregistrations;
		private final Logger debug;
		private final Logger debugtimeout;
		HttpClientImpl owner;
		ConnectionPool pool;

		SelectorManager(HttpClientImpl ref) throws IOException {
			super(null, null, "HttpClient-" + ref.id + "-SelectorManager", 0);
			owner = ref;
			debug = ref.debug;
			debugtimeout = ref.debugtimeout;
			pool = ref.connectionPool();
			registrations = new ArrayList<>();
			deregistrations = new ArrayList<>();
			selector = Selector.open();
		}

		void eventUpdated(AsyncEvent e) throws ClosedChannelException {
			if (Thread.currentThread() == this) {
				SelectionKey key = e.channel().keyFor(selector);
				if (key != null && key.isValid()) {
					SelectorAttachment sa = (SelectorAttachment) key.attachment();
					sa.register(e);
				} else if (e.interestOps() != 0) {
					// We don't care about paused events.
					// These are actually handled by
					// SelectorAttachment::resetInterestOps later on.
					// But if we reach here when trying to resume an
					// event then it's better to fail fast.
					if (debug.on())
						debug.log("No key for channel");
					e.abort(new IOException("No key for channel"));
				}
			} else {
				register(e);
			}
		}

		// This returns immediately. So caller not allowed to send/receive
		// on connection.
		synchronized void register(AsyncEvent e) {
			registrations.add(e);
			selector.wakeup();
		}

		@SuppressWarnings("unused")
		synchronized void cancel(SocketChannel e) {
			SelectionKey key = e.keyFor(selector);
			if (key != null) {
				key.cancel();
			}
			selector.wakeup();
		}

		void wakeupSelector() {
			selector.wakeup();
		}

		synchronized void shutdown() {
			Log.logTrace("{0}: shutting down", getName());
			if (debug.on())
				debug.log("SelectorManager shutting down");
			closed = true;
			try {
				selector.close();
			} catch (IOException ignored) {
			} finally {
				owner.stop();
			}
		}

		@Override
		public void run() {
			List<Pair<AsyncEvent, IOException>> errorList = new ArrayList<>();
			List<AsyncEvent> readyList = new ArrayList<>();
			List<Runnable> resetList = new ArrayList<>();
			try {
				if (Log.channel())
					Log.logChannel(getName() + ": starting");
				while (!Thread.currentThread().isInterrupted()) {
					synchronized (this) {
						assert errorList.isEmpty();
						assert readyList.isEmpty();
						assert resetList.isEmpty();
						for (AsyncTriggerEvent event : deregistrations) {
							event.handle();
						}
						deregistrations.clear();
						for (AsyncEvent event : registrations) {
							if (event instanceof AsyncTriggerEvent) {
								readyList.add(event);
								continue;
							}
							SelectableChannel chan = event.channel();
							SelectionKey key = null;
							try {
								key = chan.keyFor(selector);
								SelectorAttachment sa;
								if (key == null || !key.isValid()) {
									if (key != null) {
										// key is canceled.
										// invoke selectNow() to purge it
										// before registering the new event.
										selector.selectNow();
									}
									sa = new SelectorAttachment(chan, selector);
								} else {
									sa = (SelectorAttachment) key.attachment();
								}
								// may throw IOE if channel closed: that's OK
								sa.register(event);
								if (!chan.isOpen()) {
									throw new IOException("Channel closed");
								}
							} catch (IOException e) {
								Log.logTrace("{0}: {1}", getName(), e);
								if (debug.on())
									debug.log("Got " + e.getClass().getName() + " while handling registration events");
								chan.close();
								// let the event abort deal with it
								errorList.add(new Pair<>(event, e));
								if (key != null) {
									key.cancel();
									selector.selectNow();
								}
							}
						}
						registrations.clear();
						selector.selectedKeys().clear();
					}

					for (AsyncEvent event : readyList) {
						assert event instanceof AsyncTriggerEvent;
						event.handle();
					}
					readyList.clear();

					for (Pair<AsyncEvent, IOException> error : errorList) {
						// an IOException was raised and the channel closed.
						handleEvent(error.first, error.second);
					}
					errorList.clear();

					// Check whether client is still alive, and if not,
					// gracefully stop this thread
					if (!owner.isReferenced()) {
						Log.logTrace("{0}: {1}", getName(), "HttpClient no longer referenced. Exiting...");
						return;
					}

					// Timeouts will have milliseconds granularity. It is important
					// to handle them in a timely fashion.
					long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
					if (debugtimeout.on())
						debugtimeout.log("next timeout: %d", nextTimeout);

					// Keep-alive have seconds granularity. It's not really an
					// issue if we keep connections linger a bit more in the keep
					// alive cache.
					long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
					if (debugtimeout.on())
						debugtimeout.log("next expired: %d", nextExpiry);

					assert nextTimeout >= 0;
					assert nextExpiry >= 0;

					// Don't wait for ever as it might prevent the thread to
					// stop gracefully. millis will be 0 if no deadline was found.
					if (nextTimeout <= 0)
						nextTimeout = NODEADLINE;

					// Clip nextExpiry at NODEADLINE limit. The default
					// keep alive is 1200 seconds (half an hour) - we don't
					// want to wait that long.
					if (nextExpiry <= 0)
						nextExpiry = NODEADLINE;
					else
						nextExpiry = Math.min(NODEADLINE, nextExpiry);

					// takes the least of the two.
					long millis = Math.min(nextExpiry, nextTimeout);

					if (debugtimeout.on())
						debugtimeout.log("Next deadline is %d", (millis == 0 ? NODEADLINE : millis));
					// debugPrint(selector);
					int n = selector.select(millis == 0 ? NODEADLINE : millis);
					if (n == 0) {
						// Check whether client is still alive, and if not,
						// gracefully stop this thread
						if (!owner.isReferenced()) {
							Log.logTrace("{0}: {1}", getName(), "HttpClient no longer referenced. Exiting...");
							return;
						}
						owner.purgeTimeoutsAndReturnNextDeadline();
						continue;
					}

					Set<SelectionKey> keys = selector.selectedKeys();
					assert errorList.isEmpty();

					for (SelectionKey key : keys) {
						SelectorAttachment sa = (SelectorAttachment) key.attachment();
						if (!key.isValid()) {
							IOException ex = sa.chan.isOpen() ? new IOException("Invalid key") : new ClosedChannelException();
							sa.pending.forEach(e -> errorList.add(new Pair<>(e, ex)));
							sa.pending.clear();
							continue;
						}

						int eventsOccurred;
						try {
							eventsOccurred = key.readyOps();
						} catch (CancelledKeyException ex) {
							IOException io = Utils.getIOException(ex);
							sa.pending.forEach(e -> errorList.add(new Pair<>(e, io)));
							sa.pending.clear();
							continue;
						}
						sa.events(eventsOccurred).forEach(readyList::add);
						resetList.add(() -> sa.resetInterestOps(eventsOccurred));
					}

					selector.selectNow(); // complete cancellation
					selector.selectedKeys().clear();

					// handle selected events
					readyList.forEach((e) -> handleEvent(e, null));
					readyList.clear();

					// handle errors (closed channels etc...)
					errorList.forEach((p) -> handleEvent(p.first, p.second));
					errorList.clear();

					// reset interest ops for selected channels
					resetList.forEach(r -> r.run());
					resetList.clear();

				}
			} catch (Throwable e) {
				if (!closed) {
					// This terminates thread. So, better just print stack trace
					String err = Utils.stackTrace(e);
					Log.logError("{0}: {1}: {2}", getName(), "HttpClientImpl shutting down due to fatal error", err);
				}
				if (debug.on())
					debug.log("shutting down", e);
				if (Utils.ASSERTIONSENABLED && !debug.on()) {
					e.printStackTrace(System.err); // always print the stack
				}
			} finally {
				if (Log.channel())
					Log.logChannel(getName() + ": stopping");
				shutdown();
			}
		}

//        void debugPrint(Selector selector) {
//            System.err.println("Selector: debugprint start");
//            Set<SelectionKey> keys = selector.keys();
//            for (SelectionKey key : keys) {
//                SelectableChannel c = key.channel();
//                int ops = key.interestOps();
//                System.err.printf("selector chan:%s ops:%d\n", c, ops);
//            }
//            System.err.println("Selector: debugprint end");
//        }

		/** Handles the given event. The given ioe may be null. */
		void handleEvent(AsyncEvent event, IOException ioe) {
			if (closed || ioe != null) {
				event.abort(ioe);
			} else {
				event.handle();
			}
		}
	}

	final String debugInterestOps(SelectableChannel channel) {
		try {
			SelectionKey key = channel.keyFor(selmgr.selector);
			if (key == null)
				return "channel not registered with selector";
			String keyInterestOps = key.isValid() ? "key.interestOps=" + key.interestOps() : "invalid key";
			return String.format("channel registered with selector, %s, sa.interestOps=%s", keyInterestOps,
					((SelectorAttachment) key.attachment()).interestOps);
		} catch (Throwable t) {
			return String.valueOf(t);
		}
	}

	/**
	 * Tracks multiple user level registrations associated with one NIO registration
	 * (SelectionKey). In this implementation, registrations are one-off and when an
	 * event is posted the registration is cancelled until explicitly registered
	 * again.
	 *
	 * <p>
	 * No external synchronization required as this class is only used by the
	 * SelectorManager thread. One of these objects required per connection.
	 */
	private static class SelectorAttachment {
		private final SelectableChannel chan;
		private final Selector selector;
		private final Set<AsyncEvent> pending;
		private final static Logger debug = Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
		private int interestOps;

		SelectorAttachment(SelectableChannel chan, Selector selector) {
			this.pending = new HashSet<>();
			this.chan = chan;
			this.selector = selector;
		}

		void register(AsyncEvent e) throws ClosedChannelException {
			int newOps = e.interestOps();
			// re register interest if we are not already interested
			// in the event. If the event is paused, then the pause will
			// be taken into account later when resetInterestOps is called.
			boolean reRegister = (interestOps & newOps) != newOps;
			interestOps |= newOps;
			pending.add(e);
			if (debug.on())
				debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
			if (reRegister) {
				// first time registration happens here also
				try {
					chan.register(selector, interestOps, this);
				} catch (Throwable x) {
					abortPending(x);
				}
			} else if (!chan.isOpen()) {
				abortPending(new ClosedChannelException());
			}
		}

		/**
		 * Returns a Stream<AsyncEvents> containing only events that are registered with
		 * the given {@code interestOps}.
		 */
		Stream<AsyncEvent> events(int interestOps) {
			return pending.stream().filter(ev -> (ev.interestOps() & interestOps) != 0);
		}

		/**
		 * Removes any events with the given {@code interestOps}, and if no events
		 * remaining, cancels the associated SelectionKey.
		 */
		void resetInterestOps(int interestOps) {
			int newOps = 0;

			Iterator<AsyncEvent> itr = pending.iterator();
			while (itr.hasNext()) {
				AsyncEvent event = itr.next();
				int evops = event.interestOps();
				if (event.repeating()) {
					newOps |= evops;
					continue;
				}
				if ((evops & interestOps) != 0) {
					itr.remove();
				} else {
					newOps |= evops;
				}
			}

			this.interestOps = newOps;
			SelectionKey key = chan.keyFor(selector);
			if (newOps == 0 && key != null && pending.isEmpty()) {
				key.cancel();
			} else {
				try {
					if (key == null || !key.isValid()) {
						throw new CancelledKeyException();
					}
					key.interestOps(newOps);
					// double check after
					if (!chan.isOpen()) {
						abortPending(new ClosedChannelException());
						return;
					}
					assert key.interestOps() == newOps;
				} catch (CancelledKeyException x) {
					// channel may have been closed
					if (debug.on())
						debug.log("key cancelled for " + chan);
					abortPending(x);
				}
			}
		}

		void abortPending(Throwable x) {
			if (!pending.isEmpty()) {
				AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
				pending.clear();
				IOException io = Utils.getIOException(x);
				for (AsyncEvent event : evts) {
					event.abort(io);
				}
			}
		}
	}

	/* package-private */ SSLContext theSSLContext() {
		return sslContext;
	}

	@Override
	public SSLContext sslContext() {
		return sslContext;
	}

	@Override
	public SSLParameters sslParameters() {
		return Utils.copySSLParameters(sslParams);
	}

	@Override
	public Optional<Authenticator> authenticator() {
		return Optional.ofNullable(authenticator);
	}

	/* package-private */ final DelegatingExecutor theExecutor() {
		return delegatingExecutor;
	}

	@Override
	public final Optional<Executor> executor() {
		return isDefaultExecutor ? Optional.empty() : Optional.of(delegatingExecutor.delegate());
	}

	ConnectionPool connectionPool() {
		return connections;
	}

	@Override
	public Redirect followRedirects() {
		return followRedirects;
	}

	@Override
	public Optional<CookieHandler> cookieHandler() {
		return Optional.ofNullable(cookieHandler);
	}

	@Override
	public Optional<Duration> connectTimeout() {
		return Optional.ofNullable(connectTimeout);
	}

	@Override
	public Optional<ProxySelector> proxy() {
		return this.userProxySelector;
	}

	// Return the effective proxy that this client uses.
	ProxySelector proxySelector() {
		return proxySelector;
	}

	@Override
	public WebSocket.Builder newWebSocketBuilder() {
		// Make sure to pass the HttpClientFacade to the WebSocket builder.
		// This will ensure that the facade is not released before the
		// WebSocket has been created, at which point the pendingOperationCount
		// will have been incremented by the RawChannelTube.
		// See RawChannelTube.
		return new BuilderImpl(this.facade(), proxySelector);
	}

	@Override
	public Version version() {
		return version;
	}

	String dbgString() {
		return dbgTag;
	}

	@Override
	public String toString() {
		// Used by tests to get the client's id and compute the
		// name of the SelectorManager thread.
		return super.toString() + ("(" + id + ")");
	}

	private void initFilters() {
		addFilter(AuthenticationFilter.class);
		addFilter(RedirectFilter.class);
		if (this.cookieHandler != null) {
			addFilter(CookieFilter.class);
		}
	}

	private void addFilter(Class<? extends HeaderFilter> f) {
		filters.addFilter(f);
	}

	final LinkedList<HeaderFilter> filterChain() {
		return filters.getFilterChain();
	}

	// Timer controls.
	// Timers are implemented through timed Selector.select() calls.

	synchronized void registerTimer(TimeoutEvent event) {
		Log.logTrace("Registering timer {0}", event);
		timeouts.add(event);
		selmgr.wakeupSelector();
	}

	synchronized void cancelTimer(TimeoutEvent event) {
		Log.logTrace("Canceling timer {0}", event);
		timeouts.remove(event);
	}

	/**
	 * Purges ( handles ) timer events that have passed their deadline, and returns
	 * the amount of time, in milliseconds, until the next earliest event. A return
	 * value of 0 means that there are no events.
	 */
	private long purgeTimeoutsAndReturnNextDeadline() {
		long diff = 0L;
		List<TimeoutEvent> toHandle = null;
		int remaining = 0;
		// enter critical section to retrieve the timeout event to handle
		synchronized (this) {
			if (timeouts.isEmpty())
				return 0L;

			Instant now = Instant.now();
			Iterator<TimeoutEvent> itr = timeouts.iterator();
			while (itr.hasNext()) {
				TimeoutEvent event = itr.next();
				diff = now.until(event.deadline(), ChronoUnit.MILLIS);
				if (diff <= 0) {
					itr.remove();
					toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
					toHandle.add(event);
				} else {
					break;
				}
			}
			remaining = timeouts.size();
		}

		// can be useful for debugging
		if (toHandle != null && Log.trace()) {
			Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling " + toHandle.size() + " events, " + "remaining " + remaining + ", next deadline: "
					+ (diff < 0 ? 0L : diff));
		}

		// handle timeout events out of critical section
		if (toHandle != null) {
			Throwable failed = null;
			for (TimeoutEvent event : toHandle) {
				try {
					Log.logTrace("Firing timer {0}", event);
					event.handle();
				} catch (Error | RuntimeException e) {
					// Not expected. Handle remaining events then throw...
					// If e is an OOME or SOE it might simply trigger a new
					// error from here - but in this case there's not much we
					// could do anyway. Just let it flow...
					if (failed == null)
						failed = e;
					else
						failed.addSuppressed(e);
					Log.logTrace("Failed to handle event {0}: {1}", event, e);
				}
			}
			if (failed instanceof Error)
				throw (Error) failed;
			if (failed instanceof RuntimeException)
				throw (RuntimeException) failed;
		}

		// return time to wait until next event. 0L if there's no more events.
		return diff < 0 ? 0L : diff;
	}

	// used for the connection window
	int getReceiveBufferSize() {
		return Utils.getIntegerNetProperty("jdk.httpclient.receiveBufferSize", 0 // only set the size if > 0
		);
	}

	// Optimization for reading SSL encrypted data
	// --------------------------------------------

	// Returns a BufferSupplier that can be used for reading
	// encrypted bytes of the channel. These buffers can then
	// be recycled by the SSLFlowDelegate::Reader after their
	// content has been copied in the SSLFlowDelegate::Reader
	// readBuf.
	// Because allocating, reading, copying, and recycling
	// all happen in the SelectorManager thread,
	// then this BufferSupplier can be shared between all
	// the SSL connections managed by this client.
	BufferSupplier getSSLBufferSupplier() {
		return sslBufferSupplier;
	}

	// An implementation of BufferSupplier that manages a pool of
	// maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
	// are used for reading encrypted bytes off the channel before
	// copying and subsequent unwrapping.
	private static final class SSLDirectBufferSupplier implements BufferSupplier {
		private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;
		private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
		private final HttpClientImpl client;
		private final Logger debug;
		private int tail, count; // no need for volatile: only accessed in SM thread.

		SSLDirectBufferSupplier(HttpClientImpl client) {
			this.client = Objects.requireNonNull(client);
			this.debug = client.debug;
		}

		// Gets a buffer from the pool, or allocates a new one if needed.
		@Override
		public ByteBuffer get() {
			assert client.isSelectorThread();
			assert tail <= POOL_SIZE : "allocate tail is " + tail;
			ByteBuffer buf;
			if (tail == 0) {
				if (debug.on()) {
					// should not appear more than SocketTube.MAX_BUFFERS
					debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
				}
				assert count++ < POOL_SIZE : "trying to allocate more than " + POOL_SIZE + " buffers";
				buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
			} else {
				assert tail > 0 : "non positive tail value: " + tail;
				tail--;
				buf = pool[tail];
				pool[tail] = null;
			}
			assert buf.isDirect();
			assert buf.position() == 0;
			assert buf.hasRemaining();
			assert buf.limit() == Utils.BUFSIZE;
			assert tail < POOL_SIZE;
			assert tail >= 0;
			return buf;
		}

		// Returns the given buffer to the pool.
		@Override
		public void recycle(ByteBuffer buffer) {
			assert client.isSelectorThread();
			assert buffer.isDirect();
			assert !buffer.hasRemaining();
			assert tail < POOL_SIZE : "recycle tail is " + tail;
			assert tail >= 0;
			buffer.position(0);
			buffer.limit(buffer.capacity());
			// don't fail if assertions are off. we have asserted above.
			if (tail < POOL_SIZE) {
				pool[tail] = buffer;
				tail++;
			}
			assert tail <= POOL_SIZE;
			assert tail > 0;
		}
	}

}